// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of K9s

package dao

import (
	"bufio"
	"context"
	"errors"
	"fmt"
	"io"
	"log/slog"
	"sync"
	"time"

	"github.com/cenkalti/backoff/v4"
	"github.com/derailed/k9s/internal"
	"github.com/derailed/k9s/internal/client"
	"github.com/derailed/k9s/internal/render"
	"github.com/derailed/k9s/internal/slogs"
	"github.com/derailed/k9s/internal/watch"
	"github.com/derailed/tview"
	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/sets"
	restclient "k8s.io/client-go/rest"
	mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
)

var (
	_ Accessor        = (*Pod)(nil)
	_ Nuker           = (*Pod)(nil)
	_ Loggable        = (*Pod)(nil)
	_ Controller      = (*Pod)(nil)
	_ ContainsPodSpec = (*Pod)(nil)
	_ ImageLister     = (*Pod)(nil)
)

type streamResult int

const (
	logRetryCount                  = 20
	logBackoffInitial              = 500 * time.Millisecond
	logBackoffMax                  = 30 * time.Second
	logChannelBuffer               = 50   // Buffer size for log channel to reduce drops
	streamEOF         streamResult = iota // legit container log close (no retry)
	streamError                           // retryable error (network, auth, etc.)
	streamCanceled                        // context canceled
)

// Pod represents a pod resource.
type Pod struct {
	Resource
}

// shouldStopRetrying checks if we should stop retrying log streaming based on pod status.
func (p *Pod) shouldStopRetrying(path string) bool {
	pod, err := p.GetInstance(path)
	if err != nil {
		return true
	}

	if pod.DeletionTimestamp != nil {
		return true
	}

	switch pod.Status.Phase {
	case v1.PodSucceeded, v1.PodFailed:
		return true
	default:
		return false
	}
}

// Get returns a resource instance if found, else an error.
func (p *Pod) Get(ctx context.Context, path string) (runtime.Object, error) {
	o, err := p.Resource.Get(ctx, path)
	if err != nil {
		return o, err
	}

	u, ok := o.(*unstructured.Unstructured)
	if !ok {
		return nil, fmt.Errorf("expecting *unstructured.Unstructured but got `%T", o)
	}

	var pmx *mv1beta1.PodMetrics
	if withMx, ok := ctx.Value(internal.KeyWithMetrics).(bool); ok && withMx {
		pmx, _ = client.DialMetrics(p.Client()).FetchPodMetrics(ctx, path)
	}

	return &render.PodWithMetrics{Raw: u, MX: pmx}, nil
}

// ListImages lists container images.
func (p *Pod) ListImages(_ context.Context, path string) ([]string, error) {
	pod, err := p.GetInstance(path)
	if err != nil {
		return nil, err
	}

	return render.ExtractImages(&pod.Spec), nil
}

// List returns a collection of nodes.
func (p *Pod) List(ctx context.Context, ns string) ([]runtime.Object, error) {
	oo, err := p.Resource.List(ctx, ns)
	if err != nil {
		return oo, err
	}

	var pmx client.PodsMetricsMap
	if withMx, ok := ctx.Value(internal.KeyWithMetrics).(bool); ok && withMx {
		pmx, _ = client.DialMetrics(p.Client()).FetchPodsMetricsMap(ctx, ns)
	}
	sel, _ := ctx.Value(internal.KeyFields).(string)
	fsel, err := labels.ConvertSelectorToLabelsMap(sel)
	if err != nil {
		return nil, err
	}
	nodeName := fsel["spec.nodeName"]

	res := make([]runtime.Object, 0, len(oo))
	for _, o := range oo {
		u, ok := o.(*unstructured.Unstructured)
		if !ok {
			return res, fmt.Errorf("expecting *unstructured.Unstructured but got `%T", o)
		}
		fqn := extractFQN(o)
		if nodeName == "" {
			res = append(res, &render.PodWithMetrics{Raw: u, MX: pmx[fqn]})
			continue
		}

		spec, ok := u.Object["spec"].(map[string]any)
		if !ok {
			return res, fmt.Errorf("expecting interface map but got `%T", o)
		}
		if spec["nodeName"] == nodeName {
			res = append(res, &render.PodWithMetrics{Raw: u, MX: pmx[fqn]})
		}
	}

	return res, nil
}

// Logs fetch container logs for a given pod and container.
func (p *Pod) Logs(path string, opts *v1.PodLogOptions) (*restclient.Request, error) {
	ns, n := client.Namespaced(path)
	auth, err := p.Client().CanI(ns, client.NewGVR(client.PodGVR.String()+":log"), n, client.GetAccess)
	if err != nil {
		return nil, err
	}
	if !auth {
		return nil, fmt.Errorf("user is not authorized to view pod logs")
	}

	dial, err := p.Client().DialLogs()
	if err != nil {
		return nil, err
	}

	return dial.CoreV1().Pods(ns).GetLogs(n, opts), nil
}

// Containers returns all container names on pod.
func (p *Pod) Containers(path string, includeInit bool) ([]string, error) {
	pod, err := p.GetInstance(path)
	if err != nil {
		return nil, err
	}

	cc := make([]string, 0, len(pod.Spec.Containers)+len(pod.Spec.InitContainers))
	for i := range pod.Spec.Containers {
		cc = append(cc, pod.Spec.Containers[i].Name)
	}

	if includeInit {
		for i := range pod.Spec.InitContainers {
			cc = append(cc, pod.Spec.InitContainers[i].Name)
		}
	}

	return cc, nil
}

// Pod returns a pod victim by name.
func (*Pod) Pod(fqn string) (string, error) {
	return fqn, nil
}

// GetInstance returns a pod instance.
func (p *Pod) GetInstance(fqn string) (*v1.Pod, error) {
	o, err := p.getFactory().Get(p.gvr, fqn, true, labels.Everything())
	if err != nil {
		return nil, err
	}

	var pod v1.Pod
	err = runtime.DefaultUnstructuredConverter.FromUnstructured(o.(*unstructured.Unstructured).Object, &pod)
	if err != nil {
		return nil, err
	}

	return &pod, nil
}

// TailLogs tails a given container logs.
func (p *Pod) TailLogs(ctx context.Context, opts *LogOptions) ([]LogChan, error) {
	fac, ok := ctx.Value(internal.KeyFactory).(*watch.Factory)
	if !ok {
		return nil, errors.New("no factory in context")
	}
	o, err := fac.Get(p.gvr, opts.Path, true, labels.Everything())
	if err != nil {
		return nil, err
	}
	var po v1.Pod
	if err := runtime.DefaultUnstructuredConverter.FromUnstructured(o.(*unstructured.Unstructured).Object, &po); err != nil {
		return nil, err
	}
	coCounts := len(po.Spec.InitContainers) + len(po.Spec.Containers) + len(po.Spec.EphemeralContainers)
	if coCounts == 1 {
		opts.SingleContainer = true
	}

	outs := make([]LogChan, 0, coCounts)
	if co, ok := GetDefaultContainer(&po.ObjectMeta, &po.Spec); ok && !opts.AllContainers {
		opts.DefaultContainer = co
		return append(outs, tailLogs(ctx, p, opts)), nil
	}
	if opts.HasContainer() && !opts.AllContainers {
		return append(outs, tailLogs(ctx, p, opts)), nil
	}
	for i := range po.Spec.InitContainers {
		cfg := opts.Clone()
		cfg.Container = po.Spec.InitContainers[i].Name
		outs = append(outs, tailLogs(ctx, p, cfg))
	}
	for i := range po.Spec.Containers {
		cfg := opts.Clone()
		cfg.Container = po.Spec.Containers[i].Name
		outs = append(outs, tailLogs(ctx, p, cfg))
	}
	for i := range po.Spec.EphemeralContainers {
		cfg := opts.Clone()
		cfg.Container = po.Spec.EphemeralContainers[i].Name
		outs = append(outs, tailLogs(ctx, p, cfg))
	}

	return outs, nil
}

// ScanSA scans for ServiceAccount refs.
func (p *Pod) ScanSA(_ context.Context, fqn string, wait bool) (Refs, error) {
	ns, n := client.Namespaced(fqn)
	oo, err := p.getFactory().List(p.gvr, ns, wait, labels.Everything())
	if err != nil {
		return nil, err
	}

	refs := make(Refs, 0, len(oo))
	for _, o := range oo {
		var pod v1.Pod
		err = runtime.DefaultUnstructuredConverter.FromUnstructured(o.(*unstructured.Unstructured).Object, &pod)
		if err != nil {
			return nil, errors.New("expecting Deployment resource")
		}
		// Just pick controller less pods...
		if len(pod.OwnerReferences) > 0 {
			continue
		}
		if serviceAccountMatches(pod.Spec.ServiceAccountName, n) {
			refs = append(refs, Ref{
				GVR: p.GVR(),
				FQN: client.FQN(pod.Namespace, pod.Name),
			})
		}
	}

	return refs, nil
}

// Scan scans for cluster resource refs.
func (p *Pod) Scan(_ context.Context, gvr *client.GVR, fqn string, wait bool) (Refs, error) {
	ns, n := client.Namespaced(fqn)
	oo, err := p.getFactory().List(p.gvr, ns, wait, labels.Everything())
	if err != nil {
		return nil, err
	}

	refs := make(Refs, 0, len(oo))
	for _, o := range oo {
		var pod v1.Pod
		err = runtime.DefaultUnstructuredConverter.FromUnstructured(o.(*unstructured.Unstructured).Object, &pod)
		if err != nil {
			return nil, errors.New("expecting Pod resource")
		}
		// Just pick controller less pods...
		if len(pod.OwnerReferences) > 0 {
			continue
		}
		switch gvr {
		case client.CmGVR:
			if !hasConfigMap(&pod.Spec, n) {
				continue
			}
			refs = append(refs, Ref{
				GVR: p.GVR(),
				FQN: client.FQN(pod.Namespace, pod.Name),
			})
		case client.SecGVR:
			found, err := hasSecret(p.Factory, &pod.Spec, pod.Namespace, n, wait)
			if err != nil {
				slog.Warn("Locate secret failed",
					slogs.FQN, fqn,
					slogs.Error, err,
				)
				continue
			}
			if !found {
				continue
			}
			refs = append(refs, Ref{
				GVR: p.GVR(),
				FQN: client.FQN(pod.Namespace, pod.Name),
			})
		case client.PvcGVR:
			if !hasPVC(&pod.Spec, n) {
				continue
			}
			refs = append(refs, Ref{
				GVR: p.GVR(),
				FQN: client.FQN(pod.Namespace, pod.Name),
			})
		case client.PcGVR:
			if !hasPC(&pod.Spec, n) {
				continue
			}
			refs = append(refs, Ref{
				GVR: p.GVR(),
				FQN: client.FQN(pod.Namespace, pod.Name),
			})
		}
	}

	return refs, nil
}

// ----------------------------------------------------------------------------
// Helpers...

func tailLogs(ctx context.Context, logger Logger, opts *LogOptions) LogChan {
	out := make(LogChan, logChannelBuffer)
	var wg sync.WaitGroup

	wg.Add(1)
	go func() {
		defer wg.Done()
		podOpts := opts.ToPodLogOptions()

		// Setup exponential backoff following project pattern
		bf := backoff.NewExponentialBackOff()
		bf.InitialInterval = logBackoffInitial
		bf.MaxElapsedTime = 0
		bf.MaxInterval = logBackoffMax / 2
		backoffCtx := backoff.WithContext(bf, ctx)
		delay := logBackoffInitial

		for range logRetryCount {
			req, err := logger.Logs(opts.Path, podOpts)
			if err != nil {
				slog.Error("Log request failed",
					slogs.Container, opts.Info(),
					slogs.Error, err,
				)
				// Check if we should stop retrying based on pod status
				if pod, ok := logger.(*Pod); ok && pod.shouldStopRetrying(opts.Path) {
					slog.Debug("Stopping log retry - pod is terminating or deleted",
						slogs.Container, opts.Info(),
					)
					return
				}
				select {
				case <-ctx.Done():
					return
				case <-time.After(delay):
					if delay = backoffCtx.NextBackOff(); delay == backoff.Stop {
						return
					}
				}
				continue
			}

			stream, e := req.Stream(ctx)
			if e != nil {
				slog.Error("Stream logs failed",
					slogs.Error, e,
					slogs.Container, opts.Info(),
				)
				// Check if we should stop retrying based on pod status
				if pod, ok := logger.(*Pod); ok && pod.shouldStopRetrying(opts.Path) {
					slog.Debug("Stopping log retry - pod is terminating or deleted",
						slogs.Container, opts.Info(),
					)
					return
				}
				select {
				case <-ctx.Done():
					return
				case <-time.After(delay):
					if delay = backoffCtx.NextBackOff(); delay == backoff.Stop {
						return
					}
				}
				continue
			}

			// Process logs until completion
			result := readLogs(ctx, stream, out, opts)
			switch result {
			case streamEOF:
				slog.Debug("Log stream ended cleanly",
					slogs.Container, opts.Info(),
				)
				return
			case streamError:
				// Check if we should stop retrying based on pod status
				if pod, ok := logger.(*Pod); ok && pod.shouldStopRetrying(opts.Path) {
					slog.Debug("Stopping log retry after stream error - pod is terminating or deleted",
						slogs.Container, opts.Info(),
					)
					return
				}
				slog.Debug("Log stream error, retrying",
					slogs.Container, opts.Info(),
				)
				select {
				case <-ctx.Done():
					return
				case <-time.After(delay):
					if delay = backoffCtx.NextBackOff(); delay == backoff.Stop {
						return
					}
				}
				continue
			case streamCanceled:
				return
			}

			// Reset backoff and delay on successful connection
			bf.Reset()
			delay = logBackoffInitial
		}

		// Out of retries
		out <- opts.ToErrLogItem(fmt.Errorf("failed to maintain log stream after %d retries", logRetryCount))
	}()

	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func readLogs(ctx context.Context, stream io.ReadCloser, out chan<- *LogItem, opts *LogOptions) streamResult {
	defer func() {
		if err := stream.Close(); err != nil && !errors.Is(err, io.ErrClosedPipe) {
			slog.Error("Failed to close stream",
				slogs.Container, opts.Info(),
				slogs.Error, err,
			)
		}
	}()

	r := bufio.NewReader(stream)

	for {
		bytes, err := r.ReadBytes('\n')
		if err == nil {
			item := opts.ToLogItem(tview.EscapeBytes(bytes))
			select {
			case <-ctx.Done():
				return streamCanceled
			case out <- item:
			default:
				// Avoid deadlock if consumer is too slow
				slog.Warn("Dropping log line due to slow consumer",
					slogs.Container, opts.Info(),
				)
			}
			continue
		}

		if errors.Is(err, io.EOF) {
			if len(bytes) > 0 {
				// Emit trailing partial line before EOF
				out <- opts.ToLogItem(tview.EscapeBytes(bytes))
			}
			slog.Debug("Log reader reached EOF", slogs.Container, opts.Info())
			out <- opts.ToErrLogItem(fmt.Errorf("stream closed: %w for %s", err, opts.Info()))
			return streamEOF
		}

		// Non-EOF error
		slog.Debug("Log stream error, will retry connection",
			slogs.Container, opts.Info(),
			slogs.Error, fmt.Errorf("stream error: %w for %s", err, opts.Info()),
		)
		// Don't send stream errors to user - they will be retried
		// Only final retry exhaustion message is shown
		return streamError
	}
}

// MetaFQN returns a fully qualified resource name.
func MetaFQN(m *metav1.ObjectMeta) string {
	if m.Namespace == "" {
		return m.Name
	}

	return FQN(m.Namespace, m.Name)
}

// GetPodSpec returns a pod spec given a resource.
func (p *Pod) GetPodSpec(path string) (*v1.PodSpec, error) {
	pod, err := p.GetInstance(path)
	if err != nil {
		return nil, err
	}
	podSpec := pod.Spec

	return &podSpec, nil
}

// SetImages sets container images.
func (p *Pod) SetImages(ctx context.Context, path string, imageSpecs ImageSpecs) error {
	ns, n := client.Namespaced(path)
	auth, err := p.Client().CanI(ns, p.gvr, n, client.PatchAccess)
	if err != nil {
		return err
	}
	if !auth {
		return fmt.Errorf("user is not authorized to patch a deployment")
	}
	manager, isManaged, err := p.isControlled(path)
	if err != nil {
		return err
	}
	if isManaged {
		return fmt.Errorf("unable to set image. This pod is managed by %s. Please set the image on the controller", manager)
	}
	jsonPatch, err := GetJsonPatch(imageSpecs)
	if err != nil {
		return err
	}
	dial, err := p.Client().Dial()
	if err != nil {
		return err
	}
	_, err = dial.CoreV1().Pods(ns).Patch(
		ctx,
		n,
		types.StrategicMergePatchType,
		jsonPatch,
		metav1.PatchOptions{},
	)

	return err
}

func (p *Pod) isControlled(path string) (fqn string, ok bool, err error) {
	pod, err := p.GetInstance(path)
	if err != nil {
		return "", false, err
	}
	references := pod.GetObjectMeta().GetOwnerReferences()
	if len(references) > 0 {
		return fmt.Sprintf("%s/%s", references[0].Kind, references[0].Name), true, nil
	}

	return "", false, nil
}

var toastPhases = sets.New(
	render.PhaseCompleted,
	render.PhasePending,
	render.PhaseCrashLoop,
	render.PhaseError,
	render.PhaseImagePullBackOff,
	render.PhaseContainerStatusUnknown,
	render.PhaseEvicted,
	render.PhaseOOMKilled,
)

func (p *Pod) Sanitize(ctx context.Context, ns string) (int, error) {
	oo, err := p.Resource.List(ctx, ns)
	if err != nil {
		return 0, err
	}

	var count int
	for _, o := range oo {
		u, ok := o.(*unstructured.Unstructured)
		if !ok {
			continue
		}
		var pod v1.Pod
		err = runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &pod)
		if err != nil {
			continue
		}

		if toastPhases.Has(render.PodStatus(&pod)) {
			// !!BOZO!! Might need to bump timeout otherwise rev limit if too many??
			fqn := client.FQN(pod.Namespace, pod.Name)
			slog.Debug("Sanitizing resource", slogs.FQN, fqn)
			if err := p.Delete(ctx, fqn, nil, 0); err != nil {
				slog.Debug("Aborted! Sanitizer delete failed",
					slogs.FQN, fqn,
					slogs.Count, count,
					slogs.Error, err,
				)
				return count, err
			}
			count++
		}
	}
	slog.Debug("Sanitizer deleted pods", slogs.Count, count)

	return count, nil
}
